package sf.database.jdbc.sql;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sf.common.exception.SmallOrmException;
import sf.common.log.LogContext;
import sf.common.log.OrmLog;
import sf.common.wrapper.Page;
import sf.database.OrmConfig;
import sf.database.dao.DBContext;
import sf.database.dbinfo.Feature;
import sf.database.dialect.DBDialect;
import sf.database.dialect.DBProperty;
import sf.database.dialect.db.MySqlDialect;
import sf.database.dialect.db.OracleDialect;
import sf.database.dialect.db.PostgreSqlDialect;
import sf.database.jdbc.handle.PageListHandler;
import sf.database.jdbc.handle.RowListHandler;
import sf.database.jdbc.handle.SingleRowHandler;
import sf.database.jdbc.rowmapper.MapRowMapper;
import sf.database.jdbc.rowmapper.RowMapper;
import sf.database.jdbc.rowmapper.RowMapperHelp;
import sf.database.jdbc.type.Jdbcs;
import sf.database.meta.MetaHolder;
import sf.database.support.DBMS;
import sf.database.util.DBUtils;
import sf.spring.util.Assert;
import sf.spring.util.CollectionUtils;
import sf.spring.util.LinkedCaseInsensitiveMap;
import sf.tools.StringUtils;

import javax.persistence.SqlResultSetMapping;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Stream;

/**
 *
 */
public class CrudSqlImpl implements CrudSqlInf {
    private static final Logger LOGGER = LoggerFactory.getLogger(CrudSqlImpl.class);

    /**
     * 分页迭代数量
     */
    public static int ITERATOR_SIZE = 100;
    static ResultSetCallback<List<Map<String, Object>>> mapRsh = new RowListHandler<>(new MapRowMapper());
    static DefaultPreparedStatement defaultPreparedStatement = new DefaultPreparedStatement();
    static ForUpdatePreparedStatement forUpdatePreparedStatement = new ForUpdatePreparedStatement();
    private static final CrudSqlImpl instance = new CrudSqlImpl();
    private final StreamPreparedStatement streamPs = new StreamPreparedStatement();

    private CrudSqlImpl() {

    }

    public static CrudSqlImpl getInstance() {
        return instance;
    }

    private void logMessage(String sql, List<Object> values) {
        LOGGER.debug("Executing SQL:[{0}],values:{1}", sql, values);
    }

    @Override
    public List<Map<String, Object>> select(Connection conn, String sql, Object... paras) {
        return select(conn, mapRsh, sql, paras);

//        List<Map<String, Object>> result = new ArrayList<Map<String, Object>>();
//        OrmLog.commonArrayLog(sql, paras);
//        PreparedStatement pst = conn.prepareStatement(sql);
//        if (paras != null && paras.length > 0) {
//            for (int i = 0; i < paras.length; i++) {
//                pst.setObject(i + 1, paras[i]);
//            }
//        }
//        ResultSet rs = pst.executeQuery();
//        ResultSetMetaData rsmd = rs.getMetaData();
//        int colAmount = rsmd.getColumnCount();
//        while (rs.next()) {
//            Map<String, Object> map = new LinkedHashMap<String, Object>();
//            for (int i = 1; i <= colAmount; i++) {
//                map.put(rsmd.getColumnName(i), rs.getObject(i));
//            }
//            result.add(map);
//        }
//        CommonSql.close(rs);
//        CommonSql.close(pst);
//        return result;
    }

    @Override
    public List<?> selectList(Connection conn, String resultSetMapping, String sql, Object... parameters) {
        Assert.notNull(resultSetMapping, "resultSetMapping cannot be null.");
        DBContext dbContext = DBUtils.doGetDBContext(conn);
        SqlResultSetMapping sqlResultSetMapping = MetaHolder.SQL_RESULT_SET_MAPPING_MAP.get(resultSetMapping);
        if (sqlResultSetMapping == null) {
            throw new RuntimeException("No corresponding SqlResultSetMapping:" + resultSetMapping);
        }
        RowMapper<?> rowMapper = RowMapperHelp.getBeanRowMapper(sqlResultSetMapping, dbContext);
        ResultSetCallback<List<?>> rsh = new RowListHandler(rowMapper);
        return select(conn, rsh, sql, parameters);
    }

    @Override
    public List<?> selectList(Connection conn, String resultSetMapping, long start, int limit, String sql, Object... parameters) {
        Assert.notNull(resultSetMapping, "resultSetMapping cannot be null.");
        DBContext dbContext = DBUtils.doGetDBContext(conn);
        SqlResultSetMapping sqlResultSetMapping = MetaHolder.SQL_RESULT_SET_MAPPING_MAP.get(resultSetMapping);
        if (sqlResultSetMapping == null) {
            throw new RuntimeException("No corresponding SqlResultSetMapping:" + resultSetMapping);
        }
        DBDialect dialect = DBUtils.doGetDialect(conn, false);
        sql = dialect.sqlPageList(new StringBuilder(sql), start, limit).toString();
        RowMapper<?> rowMapper = RowMapperHelp.getBeanRowMapper(sqlResultSetMapping, dbContext);
        ResultSetCallback<List<?>> rsh = new RowListHandler(rowMapper);
        return select(conn, rsh, sql, parameters);
    }

    @Override
    public Object selectOne(Connection conn, String resultSetMapping, String sql, Object... parameters) {
        Assert.notNull(resultSetMapping, "resultSetMapping cannot be null.");
        DBContext dbContext = DBUtils.doGetDBContext(conn);
        SqlResultSetMapping sqlResultSetMapping = MetaHolder.SQL_RESULT_SET_MAPPING_MAP.get(resultSetMapping);
        if (sqlResultSetMapping == null) {//未找到对应的
            throw new RuntimeException("No corresponding SqlResultSetMapping:" + resultSetMapping);
        }
        RowMapper<?> rowMapper = RowMapperHelp.getBeanRowMapper(sqlResultSetMapping, dbContext);
        ResultSetCallback<List<?>> rsh = new RowListHandler(rowMapper);
        return select(conn, rsh, sql, parameters);
    }

    @Override
    public <T> T selectOne(Connection conn, Class<T> beanClass, String sql, Object... parameters) {
        Assert.notNull(beanClass, "beanClass cannot be null.");
        DBContext dbContext = DBUtils.doGetDBContext(conn);
        RowMapper<T> rowMapper = RowMapperHelp.getRowMapper(beanClass, dbContext);
        ResultSetCallback<T> rsh = new SingleRowHandler<>(rowMapper);
        return select(conn, rsh, sql, parameters);
    }

    @Override
    public <T> List<T> selectList(Connection conn, Class<T> beanClass, String sql, Object... parameters) {
        Assert.notNull(beanClass, "beanClass cannot be null.");
        DBContext dbContext = DBUtils.doGetDBContext(conn);
        RowMapper<T> rowMapper = RowMapperHelp.getRowMapper(beanClass, dbContext);
        ResultSetCallback<List<T>> rsh = new RowListHandler<>(rowMapper);
        return select(conn, rsh, sql, parameters);
    }

    @Override
    public <T> List<T> selectList(Connection conn, Class<T> beanClass, long start, int limit, String sql, Object... parameters) {
        Assert.notNull(beanClass, "beanClass cannot be null.");
        DBContext dbContext = DBUtils.doGetDBContext(conn);
        DBDialect dialect = DBUtils.doGetDialect(conn, false);
        sql = dialect.sqlPageList(new StringBuilder(sql), start, limit).toString();
        RowMapper<T> rowMapper = RowMapperHelp.getRowMapper(beanClass, dbContext);
        ResultSetCallback<List<T>> rsh = new RowListHandler<>(rowMapper);
        return select(conn, rsh, sql, parameters);
    }

    @Override
    public Object[] selectArray(Connection conn, String sql, Object... parameters) {
        return selectOne(conn, Object[].class, sql, parameters);
    }

    @Override
    public <T> Page<T> selectPage(Connection conn, long start, int limit, Class<T> beanClass, String sql, Object... parameters) {
        Assert.notNull(beanClass, "beanClass cannot be null.");
        Assert.notNull(sql, "sql cannot be null.");
        String countSql = DBUtils.getSmartSqlSelectCount(sql);
        return selectPageRaw(conn, start, limit, beanClass, countSql, parameters, sql, parameters);
    }

    @Override
    public <T> Page<T> selectPageRaw(Connection conn, long start, int limit, Class<T> beanClass, String countSql, Object[] countParas, String listSql, Object[] listParas) {
        return selectPageRaw(conn, start, limit, beanClass, countSql, countParas, listSql, listParas, PageStrategy.onlyList);
    }

    @Override
    public <T> Page<T> selectPageRaw(Connection conn, long start, int limit, Class<T> beanClass, String countSql, Object[] countParas, String listSql, Object[] listParas, PageStrategy strategy) {
        Assert.notNull(beanClass, "beanClass cannot be null.");
        Assert.hasText(countSql, "count sql cannot be null,please check you sql");
        Assert.hasText(listSql, "list sql cannot be null,please check you sql");
        Assert.notNull(strategy, "strategy cannot be null.");

        Long count = selectOne(conn, Long.class, countSql, countParas);
        if (count == null) {
            count = 0L;
        }
        Page<T> page = new Page<T>(start, limit, count);
        List<T> items = Collections.emptyList();
        if (count > 0) {
            switch (strategy) {
                case onlyList://只查询列表,不包含分页参数.
                    String pageSql = DBUtils.doGetDialect(conn, false).sqlPageList(new StringBuilder(listSql), start, limit).toString();
                    if (StringUtils.isNotBlank(pageSql)) {
                        items = selectList(conn, beanClass, pageSql, listParas);// 使用数据库自身的分页SQL语句，将直接返回某一个
                    } else {
                        throw new SmallOrmException("not find page sql,please check you sql");
                    }
                    break;
                case fake: //假分页 如果不支持分页，那么使用原始的分页方法 ResultSet.absolute(first)
                    DBContext dbContext = DBUtils.doGetDBContext(conn);
                    RowMapper<T> rowMapper = RowMapperHelp.getRowMapper(beanClass, dbContext);
                    PageListHandler<T> rsh = new PageListHandler<>(rowMapper);
                    rsh.setFirstResult((int) start);
                    rsh.setMaxResults(limit);
                    //sqlite不支持滚动结果集
                    //不使用ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY参数
                    items = selectResultSet(conn, Connection::prepareStatement, rsh, listSql, listParas);
                    break;
                case hasOffsetLimit: // 查询语句已包含offset,limit 使用数据库自身的分页SQL语句，将直接返回结果
                    items = selectList(conn, beanClass, listSql, listParas);
                    break;
                default:
                    break;
            }
        }
        page.setList(items);
        return page;
    }

    public <T> T select(Connection conn, ResultSetCallback<T> rsh, String sql, Object... parameters) {
        Assert.notNull(rsh, "rsh cannot be null.");
        Assert.notNull(sql, "sql cannot be null.");
        PreparedStatement ps = null;
        ResultSet rs = null;
        T result = null;
        LogContext log = OrmLog.commonLog(null, sql, parameters);
        try {
            ps = conn.prepareStatement(sql);
            CommonSql.fillArrayStatement(ps, parameters);
            rs = ps.executeQuery();
            result = rsh.callback(rs);
            OrmLog.resultSqlLog(null, log, result, () -> OrmLog.getAutoCommit(conn));
        } catch (SQLException e) {
            OrmLog.resultSqlLog(null, log, e);
            throw new SmallOrmException(e);
        } finally {
            DBUtils.close(rs);
            DBUtils.close(ps);
        }
        return result;
    }

    public <T> T selectByContext(Connection conn, ResultSetCallback<T> rsh, String sql, Object... parameters) {
        Assert.notNull(rsh, "rsh cannot be null.");
        Assert.notNull(sql, "sql cannot be null.");
        SQLContext context = new SQLContext();
        context.setSql(sql);
        if (parameters != null && parameters.length > 0) {
            List<SQLParameter> sqlParameters = new ArrayList<>(parameters.length);
            for (Object o : parameters) {
                SQLParameter parameter = new SQLParameter();
                parameter.setValue(o);
                parameter.setHandler(Jdbcs.getDB2BeanMappingType(o.getClass()));
                sqlParameters.add(parameter);
            }
            context.setParas(sqlParameters);
        }
        return select(conn, defaultPreparedStatement, rsh, context);
    }

    public <T> T select(Connection conn, PreparedStatementCreator psc, ResultSetCallback<T> rsh, SQLContext context) {
        Assert.notNull(psc, "PreparedStatementCreator must not be null");
        Assert.notNull(rsh, "rsh cannot be null.");
        Assert.notNull(context, "context cannot be null.");
        Assert.notNull(context.getSql(), "sql cannot be null.");
        PreparedStatement ps = null;
        ResultSet rs = null;
        T result = null;
        LogContext log = OrmLog.commonLog(null, context.getSql(), context.getValues());
        try {
            ps = psc.createPreparedStatement(conn, context.getSql());
            CommonSql.fillSQLStatement(ps, context.getParas());
            rs = ps.executeQuery();
            result = rsh.callback(rs);
            OrmLog.resultSqlLog(null, log, result, () -> OrmLog.getAutoCommit(conn));
        } catch (Exception e) {
            OrmLog.resultSqlLog(null, log, e);
            throw new SmallOrmException(e);
        } finally {
            DBUtils.close(rs);
            DBUtils.close(ps);
        }
        return result;
    }

    @Override
    public <T> T selectForUpdate(Connection conn, ResultSetCallback<T> callback, String sql, Object... parameters) {
        Assert.notNull(callback, "callback cannot be null.");
        Assert.notNull(sql, "sql cannot be null.");
        SQLContext context = new SQLContext();
        context.setSql(sql);
        if (parameters != null && parameters.length > 0) {
            List<SQLParameter> sqlParameters = new ArrayList<>(parameters.length);
            for (int i = 0; i < parameters.length; i++) {
                Object o = parameters[i];
                sqlParameters.add(new SQLParameter().setValue(o).setHandler(Jdbcs.getDB2BeanMappingType(o.getClass())));
            }
            context.setParas(sqlParameters);
        }
        return selectForUpdate(conn, callback, context);
    }

    @Override
    public <T> T selectForUpdate(Connection conn, ResultSetCallback<T> callback, SQLContext context) {
        return select(conn, forUpdatePreparedStatement, callback, context);
    }

    /**
     * 迭代接口
     * @param conn
     * @param ormIt
     * @param beanClass
     * @param usePage    是否使用分页实现迭代.
     * @param sql
     * @param parameters 查询参数
     * @param <T>
     */
    @Override
    public <T> void selectIterator(Connection conn, Consumer<Iterable<T>> ormIt, Class<T> beanClass, boolean usePage, String sql, Object... parameters) {
        Assert.notNull(ormIt, "ormIt cannot be null.");
        Assert.notNull(sql, "sql cannot be null.");
        SQLContext context = new SQLContext();
        context.setSql(sql);
        if (parameters != null && parameters.length > 0) {
            List<SQLParameter> list = new ArrayList<>();
            for (int i = 0; i < parameters.length; i++) {
                Object obj = parameters[i];
                list.add(new SQLParameter(obj));
            }
            context.setParas(list);
        }
        selectIterator(conn, ormIt, beanClass, usePage, context);
    }

    @Override
    public <T> void selectIterator(Connection conn, Consumer<Iterable<T>> ormIt, Class<T> beanClass, boolean usePage, SQLContext context) {
        Assert.notNull(ormIt, "ormIt cannot be null.");
        Assert.notNull(context, "sql cannot be null.");
        Assert.notNull(context.getSql(), "sql cannot be null.");
        DBContext dbContext = DBUtils.doGetDBContext(conn);
        PreparedStatementCreator creator = streamPs;
        if (dbContext != null && dbContext.getStatementCreator() != null) {
            creator = dbContext.getStatementCreator();
        }
        ResultSetIteration<T> it = new ResultSetIteration<>();
        it.rm = RowMapperHelp.getRowMapper(beanClass, dbContext);
        ResultSetCallback<T> rsc = rs -> {
            ormIt.accept(it);
            return null;
        };
        selectIterator(conn, creator, it, rsc, usePage, context);
    }

    /**
     * @param conn
     * @param ormStream
     * @param beanClass
     * @param usePage
     * @param sql
     * @param parameters
     * @param <T>
     */
    @Override
    public <T> void selectStream(Connection conn, Consumer<Stream<T>> ormStream, Class<T> beanClass, boolean usePage, String sql, Object... parameters) {
        Assert.notNull(ormStream, "ormIt cannot be null.");
        Assert.notNull(sql, "sql cannot be null.");
        SQLContext context = new SQLContext();
        context.setSql(sql);
        if (parameters != null && parameters.length > 0) {
            List<SQLParameter> list = new ArrayList<>();
            for (int i = 0; i < parameters.length; i++) {
                Object obj = parameters[i];
                list.add(new SQLParameter(obj));
            }
            context.setParas(list);
        }
        selectStream(conn, ormStream, beanClass, usePage, context);
    }

    @Override
    public <T> void selectStream(Connection conn, Consumer<Stream<T>> ormStream, Class<T> beanClass, boolean usePage, SQLContext context) {
        Assert.notNull(ormStream, "ormIt cannot be null.");
        Assert.notNull(context, "sql cannot be null.");
        Assert.notNull(context.getSql(), "sql cannot be null.");
        DBContext dbContext = DBUtils.doGetDBContext(conn);
        PreparedStatementCreator creator = streamPs;
        if (dbContext != null && dbContext.getStatementCreator() != null) {
            creator = dbContext.getStatementCreator();
        }
        ResultSetIteration<T> it = new ResultSetIteration<>();
        it.rm = RowMapperHelp.getRowMapper(beanClass, dbContext);
        ResultSetCallback<T> rsc = rs -> {
            ormStream.accept(it.stream());
            return null;
        };
        selectIterator(conn, creator, it, rsc, usePage, context);
    }

    /**
     * @param conn
     * @param psCreator
     * @param it
     * @param rsc
     * @param usePage   是否分页实现迭代.
     * @param context
     * @param <T>
     */
    protected <T> void selectIterator(Connection conn, PreparedStatementCreator psCreator, ResultSetIteration<T> it, ResultSetCallback<T> rsc, boolean usePage, SQLContext context) {
        Assert.notNull(it, "it cannot be null.");
        Assert.notNull(rsc, "rsc cannot be null.");
        Assert.notNull(context, "context cannot be null.");
        Assert.notNull(context.getSql(), "sql cannot be null.");
        //迭代分页数量
        int iteratorSize = ITERATOR_SIZE;
        long start = 0;//起始值
        boolean doing = true;
        String sql = context.getSql();
        while (doing) {
            if (usePage) {
                sql = DBUtils.doGetDialect(conn, false).sqlPageList(new StringBuilder(context.getSql()), start, iteratorSize).toString();
            } else {
                doing = false;
            }
            PreparedStatement ps = null;
            ResultSet rs = null;
            LogContext log = OrmLog.commonLog(null, sql, context.toObjectArray());
            try {
                if (usePage) {
                    ps = conn.prepareStatement(sql);
                } else {
                    ps = psCreator.createPreparedStatement(conn, sql);
                }
                CommonSql.fillSQLStatement(ps, context.getParas());
                rs = ps.executeQuery();
                it.reset();
                it.rs = rs;
                it.rsmd = rs.getMetaData();
                rsc.callback(rs);
                if (it.rowNum == 0 || it.rowNum < iteratorSize) {
                    doing = false;
                }
                if (usePage && doing) {
                    start += iteratorSize;
                }
            } catch (Exception e) {
                throw new SmallOrmException(e);
            } finally {
                DBUtils.close(rs);
                DBUtils.close(ps);
                OrmLog.resultSqlLog(null, log, null, () -> OrmLog.getAutoCommit(conn));
            }
        }
    }

    @Override
    public <T> T selectResultSet(Connection conn, ResultSetCallback<T> callback, String sql, Object... parameters) {
        return selectResultSet(conn, Connection::prepareStatement, callback, sql, parameters);
    }

    @Override
    public <T> T selectResultSet(Connection conn, PreparedStatementCreator psc, ResultSetCallback<T> callback, String sql, Object... parameters) {
        Assert.notNull(callback, "callback cannot be null.");
        Assert.notNull(psc, "PreparedStatementCreator cannot be null.");
        Assert.notNull(sql, "sql cannot be null.");
        PreparedStatement ps = null;
        ResultSet rs = null;
        T result = null;
        LogContext log = OrmLog.commonLog(null, sql, parameters);
        try {
            ps = psc.createPreparedStatement(conn, sql);
            CommonSql.fillArrayStatement(ps, parameters);
            rs = ps.executeQuery();
            result = callback.callback(rs);
            OrmLog.resultSqlLog(null, log, result, () -> OrmLog.getAutoCommit(conn));
        } catch (Exception e) {
            OrmLog.resultSqlLog(null, log, e);
            throw new SmallOrmException(e);
        } finally {
            DBUtils.close(rs);
            DBUtils.close(ps);
        }
        return result;
    }

    @Override
    public <T> T execute(Connection con, ConnectionCallback<T> action) {
        Assert.notNull(action, "Callback object must not be null");
        try {
            return action.call(con);
        } catch (SQLException e) {
            throw new SmallOrmException(e);
        }
    }

    @Override
    public int execute(Connection conn, String sql, Object... parameters) {
        Assert.notNull(sql, "sql cannot be null.");
        PreparedStatement ps = null;
        int rows = 0;
        LogContext log = OrmLog.commonLog(null, sql, parameters);
        try {
            ps = conn.prepareStatement(sql);
            CommonSql.fillArrayStatement(ps, parameters);
            rows = ps.executeUpdate();
            OrmLog.resultSqlLog(null, log, rows, () -> OrmLog.getAutoCommit(conn));
        } catch (Exception e) {
            OrmLog.resultSqlLog(null, log, e);
            throw new SmallOrmException(e);
        } finally {
            DBUtils.closeQuietly(ps);
        }
        return rows;
    }

    @Override
    public <T> Object execute(Connection conn, PreparedStatementCreator psc, PreparedStatementSetter setter, ResultSetCallback<T> callback, String sql, Object parameters) {
        Assert.notNull(sql, "sql cannot be null.");
        PreparedStatement ps = null;
        ResultSet rs = null;
        Object result = null;
        LogContext log = OrmLog.commonLog(null, sql, parameters);
        try {
            ps = psc.createPreparedStatement(conn, sql);
            setter.setValues(ps, parameters);
//            CommonSql.fillArrayStatement(ps, parameters);
            if (ps.execute()) {
                rs = ps.getResultSet();
                if (callback != null) {
                    result = callback.callback(rs);
                }
            } else {
                result = ps.getUpdateCount();
            }
            OrmLog.resultSqlLog(null, log, result, () -> OrmLog.getAutoCommit(conn));
        } catch (Exception e) {
            OrmLog.resultSqlLog(null, log, e);
            throw new SmallOrmException(e);
        } finally {
            if (rs != null) {
                DBUtils.closeQuietly(rs);
            }
            DBUtils.closeQuietly(ps);
        }
        return result;
    }

    @Override
    public int insertFast(Connection conn, String sql, Object... parameters) {
        return execute(conn, sql, parameters);
    }

    @Override
    public int insert(Connection conn, List<String> pkeys, Map<String, Object> keyValues, String sql, Object... parameters) {
        return execute(conn, sql, parameters, false, pkeys, keyValues);
    }

    @Override
    public int execute(Connection conn, String sql, Object[] parameters, boolean insertFast, List<String> pkeys, Map<String, Object> keyValues) {
        List<Object[]> objects = Collections.singletonList(parameters);
        List<Map<String, Object>> mapList = new ArrayList<>(2);
        int[] rows = executeBatch(conn, sql, objects, false, 1, pkeys, mapList);
        if (keyValues != null && !mapList.isEmpty()) {
            keyValues.putAll(mapList.get(0));
        }
        if (rows != null && rows.length > 0) {
            return rows[0];
        } else {
            return 0;
        }
    }

    @Override
    public int[] executeBatch(Connection conn, String sql, List<Object[]> parameters) {
        return executeBatch(conn, sql, parameters, true, 100, null, null);
    }

    @Override
    public int[] executeBatch(Connection conn, String sql, List<Object[]> parameters, boolean insertFast, int batchSize, List<String> pkeys, List<Map<String, Object>> keyValues) {
        //校验
        if (batchSize < 1) {
            throw new IllegalArgumentException("The batchSize must more than 0.");
        }
        Assert.notNull(sql, "sql cannot be null.");
//        Assert.notNull(parameters, "parameters is null.");

        int size = parameters == null ? 0 : parameters.size();
        boolean isSingle = size <= 1;
        boolean isInsert = false;
        boolean returnKey = false;
        if (StringUtils.containsIgnoreCase(sql, "insert")) {
            isInsert = true;
        }
        DBDialect dialect = DBUtils.doGetDialect(conn, false);
        PreparedStatement ps = null;
        int[] rows = new int[size == 0 ? 1 : size];
        int counter = 0;
        int pointer = 0;
        LogContext log = new LogContext();
        if (size == 0) {
            log = OrmLog.commonLog(log, sql, parameters);
        } else {
            log.setStart(System.currentTimeMillis());
            log.setSql(sql);
        }
        try {
            if (CollectionUtils.isNotEmpty(pkeys) && isInsert && !insertFast && keyValues != null && !dialect.isNosql()) {
                if (DBMS.oracle.getNumber() == dialect.getNumber() || DBMS.postgresql.getNumber() == dialect.getNumber()) {
                    ps = conn.prepareStatement(sql, pkeys.toArray(new String[pkeys.size()]));
                } else if (DBMS.sqlserver.getNumber() != dialect.getNumber()) {
                    //sqlserver需要是额外查询,才能获取主键值.
                    ps = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
                }
                returnKey = true;
            }
            if (ps == null) {
                ps = conn.prepareStatement(sql);
            }

            if (CollectionUtils.isNotEmpty(parameters)) {
                int count = 0;
                for (Object[] parameter : parameters) {
                    log = OrmLog.batchCommonLog(null, log, sql, !isSingle, batchSize, (count++) + 1, parameter);
                    CommonSql.fillArrayStatement(ps, parameter);
                    if (!isSingle) {
                        ps.addBatch();
                        if (++counter >= batchSize) {
                            counter = 0;
                            int[] r = ps.executeBatch();
                            ps.clearBatch();
                            for (int value : r) {
                                rows[pointer++] = value;
                            }
                            if (returnKey) {
                                setPkValueAfter(dialect, ps, r.length, keyValues);
                            }
                        }
                    }
                }
            }
            if (isSingle) {
                rows[0] = ps.executeUpdate();
                if (returnKey) {
                    setPkValueAfter(dialect, ps, 1, keyValues);
                }
            } else {
                if (counter > 0) {
                    int[] r = ps.executeBatch();
                    ps.clearBatch();
                    for (int i = 0; i < r.length; i++) {
                        int val = r[i];
                        rows[pointer++] = val;
                    }
                    if (returnKey) {
                        setPkValueAfter(dialect, ps, r.length, keyValues);
                    }
                }
            }
            OrmLog.resultSqlLog(null, log, rows, () -> OrmLog.getAutoCommit(conn));
        } catch (Exception e) {
            OrmLog.resultSqlLog(null, log, e);
            throw new SmallOrmException(e);
        } finally {
            DBUtils.closeQuietly(ps);
        }
        return rows;
    }

    /**
     * Get id after save record.
     * @param dialect
     * @param pst
     * @param total
     * @param keyValues
     */
    private void setPkValueAfter(DBDialect dialect, PreparedStatement pst, int total, Collection<Map<String, Object>> keyValues) {
        if (dialect.has(Feature.BATCH_GENERATED_KEY_ONLY_LAST)) {
            //sqlite,h2,derby走这个模式.
            setPkValueAfterGuess(pst, total, keyValues);
            return;
        } else if (dialect.has(Feature.BATCH_GENERATED_KEY_BY_FUNCTION)) {
            //sqlserver 需要特殊处理
            setPkValueAfterFunction(dialect.getProperty(DBProperty.GET_IDENTITY_FUNCTION), pst, total, keyValues);
            return;
        }
        ResultSet rs = null;
        try {
            rs = pst.getGeneratedKeys();
            if (rs != null) {
                ResultSetMetaData rsmd = rs.getMetaData();
                int cols = rsmd.getColumnCount();
                while (rs.next()) {
                    Map<String, Object> result = new LinkedCaseInsensitiveMap<>(cols);
                    for (int i = 1; i <= cols; i++) {
                        String columnName = rsmd.getColumnLabel(i);
                        if (columnName == null || columnName.length() == 0) {
                            columnName = rsmd.getColumnName(i);
                        }
                        result.put(columnName, rs.getObject(i));
                    }
                    keyValues.add(result);
                }
            }
        } catch (Exception e) {
            throw new SmallOrmException(e);
        } finally {
            DBUtils.closeQuietly(rs);
        }
    }

    /**
     * 猜测主键
     * @param pst
     * @param total
     * @param keyValues
     */
    private void setPkValueAfterGuess(PreparedStatement pst, int total, Collection<Map<String, Object>> keyValues) {
        ResultSet rs = null;
        try {
            rs = pst.getGeneratedKeys();
            if (rs != null) {
                ResultSetMetaData rsmd = rs.getMetaData();
                String columnName = rsmd.getColumnLabel(1);
                if (columnName == null || columnName.length() == 0) {
                    columnName = rsmd.getColumnName(1);
                }
                Assert.isTrue(rs.next(), "The JDBC Driver may not support getGeneratedKeys() operation.");
                long max = rs.getLong(1);
                for (int i = 0; i < total; i++) {
                    long num = max - (total - i - 1);
                    Map<String, Object> result = new LinkedCaseInsensitiveMap<>(1);
                    result.put(columnName, num);
                    keyValues.add(result);
                }
            }
        } catch (Exception e) {
            throw new SmallOrmException(e);
        } finally {
            DBUtils.closeQuietly(rs);
        }
    }

    /**
     * 猜测主键
     * @param function
     * @param pst
     * @param total
     * @param keyValues
     */
    private void setPkValueAfterFunction(String function, PreparedStatement pst, int total, Collection<Map<String, Object>> keyValues) {
        ResultSet rs = null;
        PreparedStatement ps = null;
        try {
            ps = pst.getConnection().prepareStatement(function);
            rs = ps.executeQuery();
            if (rs.next()) {
                ResultSetMetaData rsmd = rs.getMetaData();
                String columnName = rsmd.getColumnLabel(1);
                if (columnName == null || columnName.length() == 0) {
                    columnName = rsmd.getColumnName(1);
                }
                long max = rs.getLong(1);
                for (int i = 0; i < total; i++) {
                    long num = max - (total - i - 1);
                    Map<String, Object> result = new LinkedCaseInsensitiveMap<>(1);
                    result.put(columnName, num);
                    keyValues.add(result);
                }
            }
        } catch (Exception e) {
            throw new SmallOrmException(e);
        } finally {
            DBUtils.closeQuietly(rs);
            DBUtils.closeQuietly(ps);
        }
    }

    static class DefaultPreparedStatement implements PreparedStatementCreator {
        @Override
        public PreparedStatement createPreparedStatement(Connection con, String sql) throws SQLException {
            return con.prepareStatement(sql);
        }
    }

    static class ForUpdatePreparedStatement implements PreparedStatementCreator {
        @Override
        public PreparedStatement createPreparedStatement(Connection con, String sql) throws SQLException {
            return con.prepareStatement(sql, ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
        }
    }

    /**
     * 流式读取接口,针对mysql做了支持.
     */
    static class StreamPreparedStatement implements PreparedStatementCreator {
        @Override
        public PreparedStatement createPreparedStatement(Connection con, String sql) throws SQLException {
            DBDialect dialect = DBUtils.doGetDialect(con, false);
            if (OrmConfig.getInstance().isOpenStreamIterator()) {
                if (PostgreSqlDialect.class.isAssignableFrom(dialect.getClass())) {
                    //对于postgresql需要打开事务,才能支持流式读取.
                    if (con.getAutoCommit()) {
                        con.setAutoCommit(false);
                    }
                }
            }
            PreparedStatement ps = con.prepareStatement(sql);
            //开启mysql,oracle流式读取,对于mysql的流式读取,connection需要独占,否则会出问题.
            if (OrmConfig.getInstance().isOpenStreamIterator() && (MySqlDialect.class.isAssignableFrom(dialect.getClass()) || OracleDialect.class.isAssignableFrom(dialect.getClass()))) {
//                ps = con.prepareStatement(sql,ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY);
                ps.setFetchSize(Integer.MIN_VALUE);
            } else {
                ps.setFetchSize(1000);
            }
            return ps;
        }
    }

}
